kafka rebalance机制与Consumer多种消费模式案例
rebalance 何时触发?到底干嘛?流程如何?
reblance 何时触发
- 组订阅发生变更,比如基于正则表达式订阅,当匹配到新的topic创建时,组的订阅就会发生变更。
- 组的topic分区数发生变更,通过命令行脚本增加了订阅topic的分区数。
- 组成员发生变更:新加入组以及离开组。
reblance 到底干嘛
一句话:多个Consumer订阅了一个Topic时,根据分区策略进行消费者订阅分区的重分配
Coordinator 到底在那个Broker
找到Coordinator的算法 与 找到_consumer_offsets目标分区的算法是一致的。
- 第一步:确定目标分区:Math.abs(groupId.hashCode)%50,假设是12。
- 第二步:找到_consumer_offsets分区为10的Leader副本所在的Broker,那么该broker即为Group Coordinator。
reblance 流程如何
reblance 流程流程整体如下图所示,值得强调的几点如下:
Coordinator的角色由Broker端担任。
Group Leader 的角色主要有Consumer担任。
加入组请求(JoinGroup)=>作用在于选择Group Leader。
同步组请求(SyncGroup)=>作用在于确定分区分配方案给Coordinator,把方案响应给所有Consumer。
reblance 机制的好处
- 分区分配权利下放给客户端consumer,因此系统不用重启,既可以实现分区策略的变更。
- 用户可以自行实现机架感知分配方案。
reblance generation 过滤无用请求
- kafka引入 reblance generation ,就是为了防止Consumer group的无效Offset提交。若因为某些原因,consumer延迟提交了Offset,而该consumer被踢出了消费组,那么该Consumer再次提交位移时,携带的就是旧的generation了。
reblance 监听器应用级别实战
reblance 监听器解决用户 把位移提交到外部存储的情况,在监听器中实现位移保存和位移的重定向。
onPartitionsRevoked : rebalance开启新一轮的重平衡前会调用,一般用于手动提交位移,及审计功能
onPartitionsAssigned :rebalance在重平衡结束后会调用,一般用于消费逻辑处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
统计rebalance总时长
final AtomicLong totalRebalanceTimeMs =new AtomicLong(0L)
统计rebalance开始时刻
final AtomicLong rebalanceStart =new AtomicLong(0L)
1 重平衡监听
consumer.subscribe(Arrays.asList("test-topic"), new ConsumerRebalanceListener(){
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for(TopicPartition tp : partitions){
1 保存到外部存储
saveToExternalStore(consumer.position(tp))
2 手动提交位移
//consumer.commitSync(toCommit);
}
rebalanceStart.set(System.currentTimeMillis())
}
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
totalRebalanceTimeMs.addAndGet(System.currentTimeMillis()-rebalanceStart.get())
for (TopicPartition tp : partitions) {
consumer.seek(tp,readFromExternalStore(tp))
}
}
});
2 消息处理
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
Consumer组内消息均衡实战
Consumer 单线程封装,实现多个消费者来消费(浪费资源)
实例主题:
- ConsumerGroup 实现组封装
- ConsumerRunnable 每个线程维护私有的KafkaConsumer实例
1 | public class Main { |
1 | import java.util.ArrayList; |
1 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
一个Consumer,内部实现多线程消费(consumer压力过大)
实例主题:
- ConsumerHandler 单一的Consumer实例,poll后里面会跑一个线程池,执行多个Processor线程来处理
- Processor 业务逻辑处理方法
进一步优化建议;
- ConsumerHandler 设置手动提交位移,负责最终位移提交consumer.commitSync();。
- ConsumerHandler设置一个全局的Map<TopicPartion,OffsetAndMetadata> offsets,来管理Processor消费的位移。
- Processor 负责批处理完消息后,得到消息的最大位移,并更新offsets数组
- ConsumerHandler 根据 offsets,位移提交后会清空offsets集合。
- ConsumerHandler设置重平衡监听
1 | public class Main { |
1 | import java.util.Arrays; |
1 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
方案对比
- 第一种方案:建议采用 Consumer 单线程封装,实现多个消费者来消费(浪费资源),这样能很好地保证分区内消费的顺序,同时也没有线程切换的开销。
- 第二种方案:实现复杂,问题在于可能无法维护分区内的消息顺序,注意消息处理和消息接收解耦了。
Consumer指定分区消费案例实战(Standalone Consumer)
Standalone Consumer assign 用于接收指定分区列表的消息和Subscribe是矛盾的。只能二选一。
多个 Consumer 实例消费一个 Topic 借助于 group reblance可谓是天作之合。
若要精准控制,assign逃不了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35poperties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
List<TopicPartion> partitions = new ArrayList<>();
List<PartitionInfo> allPartitions = consumer.partitionsFor("kaiXinTopic")
if(allPartitions != null && !allPartitions.isEmpty){
for(PartitionInfo partitionInfo : allPartitions){
partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()))
}
consumer.assign(partitions)
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
- 本文链接:https://gjtmaster.github.io/2018/09/17/kafka rebalance 机制与Consumer多种消费模式案例应用实战/
- 版权声明:The author owns the copyright, please indicate the source reproduced.